package com.clp.protocol.modbus_tcp.client;

import com.clp.protocol.core.client.FailedToConnectException;
import com.clp.protocol.core.client.NettyClient;
import com.clp.protocol.modbus_tcp.client.async.*;
import com.clp.protocol.modbus_tcp.client.master_config.MasterConnConfig;
import com.clp.protocol.modbus_tcp.client.pipeline.AbstractModbusChannelInitializer;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import lombok.extern.slf4j.Slf4j;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class ModbusClient extends MasterContainer implements Closeable {

    private static volatile ModbusClient modbusClient;

    public static ModbusClient get() {
        // 懒加载
        if (modbusClient != null && !modbusClient.isClosed()) {
            return modbusClient;
        }
        synchronized (ModbusClient.class) {
            if (modbusClient != null && !modbusClient.isClosed()) {
                return modbusClient;
            }
            modbusClient = new ModbusClient();
        }
        return modbusClient;
    }

    /**
     * Netty客户端
     */
    private final NettyClient nettyClient;

    /**
     * 基于初始化器创建对应的主站程序
     */
    private ModbusClient() {
        this.nettyClient = new NettyClient();
    }

    public synchronized MasterFuture<Void> connect(MasterConnConfig cfg) {
        // 如果检查配置失败，则连接失败
        try {
            cfg.check();
            // 连接
            final ChannelFuture connFuture = nettyClient.connect(cfg.getInitializer(), cfg.getRemoteHost(), cfg.getRemotePort());
            final MasterImpl masterImpl = new MasterImpl(connFuture.channel(), cfg);
            // 处理结果
            final MasterPromise<Void> connPromise = new DefaultMasterPromise<>(masterImpl);
            connFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (connFuture.isSuccess()) {
                        ModbusClient.this.addMasterIfNotExists(masterImpl);
                        // 启动所有监视作业
                        masterImpl.startMonitorTasks();
                        connPromise.setSuccess();
                    } else {
                        connFuture.channel().close();
                        connPromise.setFailure(future.cause());
                    }
                }
            });
            return connPromise;
        } catch (Throwable t) {
            return new FailedMasterPromise(null, t.getCause());
        }
    }

    public synchronized MasterFuture<Void> reconnect(String remoteHost, int remotePort) {
        return reconnect(getMaster(remoteHost, remotePort));
    }

    public synchronized MasterFuture<Void> reconnect(Master master) {
        if (master == null) {
            log.error("重连失败，该主站不存在！");
            return new FailedMasterPromise(null, new FailedToConnectException("主站不存在"));
        }
        MasterPromise<Void> promise = new DefaultMasterPromise<>(master);

        // 停止所有监视作业
        MasterImpl masterImpl = (MasterImpl) master;
        masterImpl.stopMonitorTasks();
        // 关闭通道
        MasterFuture<Void> closeFuture = masterImpl.closeChannel();
        closeFuture.addListener(new MasterFutureListener<Void>() {
            @Override
            public void operationComplete(MasterFuture<Void> future) {
                if (!closeFuture.isSuccess()) {
                    log.error("通道关闭失败！");
                    promise.setFailure(closeFuture.cause());
                    return;
                }

                // 再连接
                nettyClient.connect(masterImpl.getInitializer(), master.remoteHost(), master.remotePort()).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            log.error("重连" + master.remoteHost() + ": " + master.remotePort() + "失败！");
                            promise.setFailure(new FailedToConnectException("重连失败！"));
                            return;
                        }
                        // 重置主站
                        masterImpl.reset(future.channel());
                        // 启动所有监视作业
                        masterImpl.startMonitorTasks();
                        // TODO 如果从列表中删除了，重新加上
                        ModbusClient.this.addMasterIfNotExists(master);
                        promise.setSuccess();
                    }
                });
            }
        });

        return promise;
    }

    public synchronized ClientFuture<Void> disconnectAll() {
        if (isClosed() || isEmpty()) return new DefaultClientPromise<Void>(this).setSuccess();
        DefaultClientPromise<Void> promise = new DefaultClientPromise<>(this);
        List<MasterFuture<Void>> futures = new ArrayList<>();
        for (Master master : this) {
            futures.add(disconnect(master, true));
        }
        final AtomicInteger count = new AtomicInteger(0);
        futures.forEach(future -> {
            future.addListener(new MasterFutureListener<Void>() {
                @Override
                public void operationComplete(MasterFuture<Void> future) {
                    if (future.isSuccess()) {
                        if (count.incrementAndGet() == futures.size()) {
                            promise.setSuccess();
                        }
                    }
                }
            });
        });
        return promise;
    }

    public synchronized MasterFuture<Void> disconnect(Master master, boolean isClientClose) {
        if (isClosed() || master == null) return new FailedMasterPromise(master, new RuntimeException("客户端已关闭连接或主站不存在！"));
        DefaultMasterPromise<Void> promise = new DefaultMasterPromise<>(master);
        ((MasterImpl) master).closeChannel().addListener(new MasterFutureListener<Void>() {
            @Override
            public void operationComplete(MasterFuture<Void> future) {
                if (future.isSuccess()) {
                    if (!isClientClose) {
                        ModbusClient.this.removeMasterIfExists(master);
                    }
                    promise.setSuccess();
                    return;
                }
                promise.setFailure(new RuntimeException("通道关闭失败！"));
            }
        });
        return promise;
    }

    private boolean isClosed() {
        return nettyClient.isClosed();
    }

    @Override
    public void close() throws IOException {
        if (isClosed()) return;
        disconnectAll().sync();
        nettyClient.close();
    }
}
